/*
 * Unidata Platform Community Edition
 * Copyright (c) 2013-2020, UNIDATA LLC, All rights reserved.
 * This file is part of the Unidata Platform Community Edition software.
 *
 * Unidata Platform Community Edition is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * Unidata Platform Community Edition is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program. If not, see <https://www.gnu.org/licenses/>.
 */

package org.unidata.mdm.dq.data.service.segments.records.upsert;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.unidata.mdm.core.service.MetaModelService;
import org.unidata.mdm.core.type.model.EntityElement;
import org.unidata.mdm.data.configuration.DataNamespace;
import org.unidata.mdm.data.context.DataContextFlags;
import org.unidata.mdm.data.context.UpsertRequestContext;
import org.unidata.mdm.data.type.data.UpsertAction;
import org.unidata.mdm.data.type.keys.RecordKeys;
import org.unidata.mdm.dq.core.configuration.DataQualityDescriptors;
import org.unidata.mdm.dq.core.type.model.instance.NamespaceAssignmentElement;
import org.unidata.mdm.dq.data.context.RecordQualityContext;
import org.unidata.mdm.dq.data.dto.RecordUpsertQualityResult;
import org.unidata.mdm.dq.data.module.DataQualityDataModule;
import org.unidata.mdm.meta.configuration.Descriptors;
import org.unidata.mdm.system.service.ExecutionService;
import org.unidata.mdm.system.type.namespace.NameSpace;
import org.unidata.mdm.system.type.pipeline.Connector;
import org.unidata.mdm.system.type.pipeline.Pipeline;
import org.unidata.mdm.system.type.pipeline.Start;

/**
 * @author Mikhail Mikhailov on Nov 24, 2019
 */
@Component(RecordUpsertQualityConnectorExecutor.SEGMENT_ID)
public class RecordUpsertQualityConnectorExecutor extends Connector<UpsertRequestContext, RecordUpsertQualityResult> {
    /**
     * This segment ID.
     */
    public static final String SEGMENT_ID = DataQualityDataModule.MODULE_ID + "[RECORD_UPSERT_QUALITY_CONNECTOR]";
    /**
     * Localized message code.
     */
    public static final String SEGMENT_DESCRIPTION = DataQualityDataModule.MODULE_ID + ".record.upsert.quality.connector.description";
    /**
     * The MMS instance.
     */
    @Autowired
    private MetaModelService metaModelService;
    /**
     * The ES.
     */
    @Autowired
    private ExecutionService executionService;
    /**
     * Constructor.
     * @param id
     * @param description
     */
    public RecordUpsertQualityConnectorExecutor() {
        super(SEGMENT_ID, SEGMENT_DESCRIPTION);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public RecordUpsertQualityResult connect(UpsertRequestContext ctx) {
        return execute(ctx, null);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public RecordUpsertQualityResult connect(UpsertRequestContext ctx, Pipeline p) {
        return execute(ctx, p);
    }
    /**
     * Gets DQ Data data.
     * @param ctx the context
     * @param p the pipeine (we don't care)
     * @return result
     */
    public RecordUpsertQualityResult execute(@Nonnull UpsertRequestContext ctx, @Nullable Pipeline p) {

        if (ctx.upsertAction() != UpsertAction.NO_ACTION) {

            RecordKeys keys = ctx.keys();
            EntityElement el = metaModelService.instance(Descriptors.DATA)
                .getElement(keys.getEntityName());

            NameSpace target = null;
            if (el.isLookup()) {
                target = DataNamespace.LOOKUP;
            } else if (el.isRegister()) {
                target = DataNamespace.REGISTER;
            }

            NamespaceAssignmentElement nae = metaModelService.instance(DataQualityDescriptors.DQ)
                .getAssignment(target);

            if (nae != null && nae.isAssigned(keys.getEntityName())) {

                RecordQualityContext in = RecordQualityContext.builder()
                        .assignment(nae)
                        .payload(ctx)
                        .operationId(ctx.getOperationId())
                        .build();

                in.changeSet(ctx.changeSet());
                in.currentTimeline(ctx.currentTimeline());
                in.modificationBox(ctx.modificationBox());
                in.nextTimeline(ctx.nextTimeline());
                in.operationType(ctx.operationType());
                in.timestamp(ctx.timestamp());
                in.setFlag(DataContextFlags.FLAG_BATCH_OPERATION, ctx.isBatchOperation());

                return executionService.execute(in);
            }
        }

        return null;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public boolean supports(Start<?, ?> start) {
        return UpsertRequestContext.class.isAssignableFrom(start.getInputTypeClass());
    }
}
